model monitoring

安装量: 107
排名: #7916

安装

npx skills add https://github.com/aj-geddes/useful-ai-prompts --skill 'Model Monitoring'
Model Monitoring
Overview
Monitoring deployed machine learning models ensures they continue to perform well in production, detecting data drift, concept drift, and performance degradation.
When to Use
When models are deployed in production environments serving real users
When detecting data drift or concept drift in input features
When tracking model performance metrics over time
When ensuring model reliability, accuracy, and operational health
When implementing ML observability and alerting systems
When establishing thresholds for model retraining or intervention
Monitoring Components
Performance Metrics
Accuracy, latency, throughput
Data Drift
Distribution changes in input features
Concept Drift
Changes in target variable relationships
Output Drift
Changes in prediction distribution
Feature Drift
Individual feature distribution changes
Anomaly Detection
Unusual samples in production
Monitoring Tools
Prometheus
Metrics collection and storage
Grafana
Visualization and dashboarding
MLflow
Model tracking and registry
TensorFlow Data Validation
Data statistics
Evidently
Drift detection and monitoring
Great Expectations
Data quality assertions Python Implementation import numpy as np import pandas as pd import matplotlib . pyplot as plt from sklearn . ensemble import RandomForestClassifier from sklearn . preprocessing import StandardScaler from sklearn . metrics import accuracy_score , precision_score , recall_score , f1_score from scipy import stats import json from datetime import datetime , timedelta import logging logging . basicConfig ( level = logging . INFO ) logger = logging . getLogger ( name ) print ( "=== 1. Production Monitoring System ===" ) class ModelMonitoringSystem : def init ( self , model , scaler , baseline_data , baseline_targets ) : self . model = model self . scaler = scaler self . baseline_data = baseline_data self . baseline_targets = baseline_targets self . baseline_mean = baseline_data . mean ( axis = 0 ) self . baseline_std = baseline_data . std ( axis = 0 ) self . baseline_predictions = model . predict ( baseline_data ) self . metrics_history = [ ] self . drift_alerts = [ ] self . performance_history = [ ] def log_predictions ( self , X , y_true , y_pred ) : """Log predictions and compute metrics""" timestamp = datetime . now ( )

Compute metrics

accuracy

accuracy_score ( y_true , y_pred ) precision = precision_score ( y_true , y_pred , average = 'weighted' , zero_division = 0 ) recall = recall_score ( y_true , y_pred , average = 'weighted' , zero_division = 0 ) f1 = f1_score ( y_true , y_pred , average = 'weighted' , zero_division = 0 ) metric_record = { 'timestamp' : timestamp , 'accuracy' : accuracy , 'precision' : precision , 'recall' : recall , 'f1' : f1 , 'n_samples' : len ( X ) } self . metrics_history . append ( metric_record ) return metric_record def detect_data_drift ( self , X_new ) : """Detect data drift using Kolmogorov-Smirnov test""" drift_detected = False drift_features = [ ] for feature_idx in range ( X_new . shape [ 1 ] ) : baseline_feature = self . baseline_data [ : , feature_idx ] new_feature = X_new [ : , feature_idx ]

KS Test

ks_statistic , p_value = stats . ks_2samp ( baseline_feature , new_feature ) if p_value < 0.05 :

Significant drift detected

drift_detected

True drift_features . append ( { 'feature_index' : feature_idx , 'ks_statistic' : float ( ks_statistic ) , 'p_value' : float ( p_value ) } ) if drift_detected : alert = { 'timestamp' : datetime . now ( ) , 'type' : 'data_drift' , 'severity' : 'high' , 'drifted_features' : drift_features , 'n_drifted' : len ( drift_features ) } self . drift_alerts . append ( alert ) logger . warning ( f"Data drift detected in { len ( drift_features ) } features" ) return drift_detected , drift_features def detect_output_drift ( self , y_pred_new ) : """Detect drift in model predictions""" baseline_pred_dist = pd . Series ( self . baseline_predictions ) . value_counts ( normalize = True ) new_pred_dist = pd . Series ( y_pred_new ) . value_counts ( normalize = True )

Compare distributions

classes

set ( baseline_pred_dist . index ) | set ( new_pred_dist . index ) chi2_stat = 0 for cls in classes : exp = baseline_pred_dist . get ( cls , 0.01 ) obs = new_pred_dist . get ( cls , 0.01 ) chi2_stat += ( obs - exp ) ** 2 / max ( exp , 0.01 ) p_value = 1 - stats . chi2 . cdf ( chi2_stat , len ( classes ) - 1 ) if p_value < 0.05 : alert = { 'timestamp' : datetime . now ( ) , 'type' : 'output_drift' , 'severity' : 'medium' , 'chi2_statistic' : float ( chi2_stat ) , 'p_value' : float ( p_value ) } self . drift_alerts . append ( alert ) logger . warning ( "Output drift detected in predictions" ) return True return False def detect_performance_degradation ( self , y_true , y_pred ) : """Detect if model performance has degraded""" current_accuracy = accuracy_score ( y_true , y_pred ) baseline_accuracy = accuracy_score ( self . baseline_targets , self . baseline_predictions ) degradation_threshold = 0.05

5% drop

degradation

baseline_accuracy

current_accuracy if degradation

degradation_threshold : alert = { 'timestamp' : datetime . now ( ) , 'type' : 'performance_degradation' , 'severity' : 'critical' , 'baseline_accuracy' : float ( baseline_accuracy ) , 'current_accuracy' : float ( current_accuracy ) , 'degradation' : float ( degradation ) } self . drift_alerts . append ( alert ) logger . error ( "Performance degradation detected" ) return True return False def get_monitoring_report ( self ) : """Generate monitoring report""" if not self . metrics_history : return { } metrics_df = pd . DataFrame ( self . metrics_history ) return { 'monitoring_period' : { 'start' : self . metrics_history [ 0 ] [ 'timestamp' ] . isoformat ( ) , 'end' : self . metrics_history [ - 1 ] [ 'timestamp' ] . isoformat ( ) , 'n_batches' : len ( self . metrics_history ) } , 'performance_summary' : { 'avg_accuracy' : float ( metrics_df [ 'accuracy' ] . mean ( ) ) , 'min_accuracy' : float ( metrics_df [ 'accuracy' ] . min ( ) ) , 'max_accuracy' : float ( metrics_df [ 'accuracy' ] . max ( ) ) , 'std_accuracy' : float ( metrics_df [ 'accuracy' ] . std ( ) ) , 'avg_f1' : float ( metrics_df [ 'f1' ] . mean ( ) ) } , 'drift_summary' : { 'total_alerts' : len ( self . drift_alerts ) , 'data_drift_alerts' : sum ( 1 for a in self . drift_alerts if a [ 'type' ] == 'data_drift' ) , 'output_drift_alerts' : sum ( 1 for a in self . drift_alerts if a [ 'type' ] == 'output_drift' ) , 'performance_alerts' : sum ( 1 for a in self . drift_alerts if a [ 'type' ] == 'performance_degradation' ) } , 'alerts' : self . drift_alerts [ - 10 : ]

Last 10 alerts

} print ( "Monitoring system initialized" )

2. Create baseline model

print ( "\n=== 2. Train Baseline Model ===" ) from sklearn . datasets import make_classification

Create baseline data

X_baseline , y_baseline = make_classification ( n_samples = 1000 , n_features = 20 , n_informative = 15 , random_state = 42 ) scaler = StandardScaler ( ) X_baseline_scaled = scaler . fit_transform ( X_baseline ) model = RandomForestClassifier ( n_estimators = 100 , random_state = 42 ) model . fit ( X_baseline_scaled , y_baseline ) print ( f"Baseline model trained on { len ( X_baseline ) } samples" )

3. Initialize monitoring

monitor

ModelMonitoringSystem ( model , scaler , X_baseline_scaled , y_baseline )

4. Simulate production data and monitoring

print ( "\n=== 3. Production Monitoring Simulation ===" )

Simulate normal production data

X_prod_normal

np . random . randn ( 500 , 20 ) * 0.5 X_prod_normal_scaled = scaler . transform ( X_prod_normal ) y_pred_normal = model . predict ( X_prod_normal_scaled ) y_true_normal = np . random . randint ( 0 , 2 , 500 ) metrics_normal = monitor . log_predictions ( X_prod_normal_scaled , y_true_normal , y_pred_normal ) print ( f"Normal production batch - Accuracy: { metrics_normal [ 'accuracy' ] : .4f } " )

Simulate drifted data

X_prod_drift

np . random . randn ( 500 , 20 ) * 2.0

Different distribution

X_prod_drift_scaled

scaler . transform ( X_prod_drift ) y_pred_drift = model . predict ( X_prod_drift_scaled ) y_true_drift = np . random . randint ( 0 , 2 , 500 ) metrics_drift = monitor . log_predictions ( X_prod_drift_scaled , y_true_drift , y_pred_drift ) drift_detected , drift_features = monitor . detect_data_drift ( X_prod_drift_scaled ) print ( f"Drifted production batch - Accuracy: { metrics_drift [ 'accuracy' ] : .4f } " ) print ( f"Data drift detected: { drift_detected } " )

Check performance degradation

perf_degradation

monitor . detect_performance_degradation ( y_true_drift , y_pred_drift ) print ( f"Performance degradation: { perf_degradation } " )

5. Prometheus metrics export

print ( "\n=== 4. Prometheus Metrics Format ===" ) prometheus_metrics = f"""

HELP model_accuracy Model accuracy score

TYPE model_accuracy gauge

model_accuracy { "{timestamp='2024-01-01'}" } { metrics_normal [ 'accuracy' ] : .4f }

HELP model_f1_score Model F1 score

TYPE model_f1_score gauge

model_f1_score { "{timestamp='2024-01-01'}" } { metrics_normal [ 'f1' ] : .4f }

HELP model_predictions_total Total predictions made

TYPE model_predictions_total counter

model_predictions_total 5000

HELP model_drift_detected Data drift detection flag

TYPE model_drift_detected gauge

model_drift_detected { int ( drift_detected ) }

HELP model_latency_seconds Prediction latency in seconds

TYPE model_latency_seconds histogram

model_latency_seconds_bucket{{le="0.01"}} 100 model_latency_seconds_bucket{{le="0.05"}} 450 model_latency_seconds_bucket{{le="0.1"}} 500 """ print ( "Prometheus metrics:" ) print ( prometheus_metrics )

6. Grafana dashboard JSON

print ( "\n=== 5. Grafana Dashboard Configuration ===" ) grafana_dashboard = { 'title' : 'ML Model Monitoring Dashboard' , 'panels' : [ { 'title' : 'Model Accuracy' , 'targets' : [ { 'metric' : 'model_accuracy' } ] , 'type' : 'graph' } , { 'title' : 'Data Drift Alerts' , 'targets' : [ { 'metric' : 'model_drift_detected' } ] , 'type' : 'stat' } , { 'title' : 'Prediction Latency' , 'targets' : [ { 'metric' : 'model_latency_seconds' } ] , 'type' : 'graph' } , { 'title' : 'Feature Distributions' , 'targets' : [ { 'metric' : 'feature_distribution_change' } ] , 'type' : 'heatmap' } ] } print ( "Grafana dashboard configured with 4 panels" )

7. Visualization

print ( "\n=== 6. Monitoring Visualization ===" ) fig , axes = plt . subplots ( 2 , 2 , figsize = ( 14 , 10 ) )

Performance over time

metrics_df

pd . DataFrame ( monitor . metrics_history ) axes [ 0 , 0 ] . plot ( range ( len ( metrics_df ) ) , metrics_df [ 'accuracy' ] , marker = 'o' , label = 'Accuracy' ) axes [ 0 , 0 ] . axhline ( y = metrics_df [ 'accuracy' ] . mean ( ) , color = 'r' , linestyle = '--' , label = 'Mean' ) axes [ 0 , 0 ] . set_xlabel ( 'Batch' ) axes [ 0 , 0 ] . set_ylabel ( 'Accuracy' ) axes [ 0 , 0 ] . set_title ( 'Model Accuracy Over Time' ) axes [ 0 , 0 ] . legend ( ) axes [ 0 , 0 ] . grid ( True , alpha = 0.3 )

Precision, Recall, F1

axes [ 0 , 1 ] . plot ( range ( len ( metrics_df ) ) , metrics_df [ 'precision' ] , label = 'Precision' , marker = 's' ) axes [ 0 , 1 ] . plot ( range ( len ( metrics_df ) ) , metrics_df [ 'recall' ] , label = 'Recall' , marker = '^' ) axes [ 0 , 1 ] . plot ( range ( len ( metrics_df ) ) , metrics_df [ 'f1' ] , label = 'F1' , marker = 'd' ) axes [ 0 , 1 ] . set_xlabel ( 'Batch' ) axes [ 0 , 1 ] . set_ylabel ( 'Score' ) axes [ 0 , 1 ] . set_title ( 'Performance Metrics Over Time' ) axes [ 0 , 1 ] . legend ( ) axes [ 0 , 1 ] . grid ( True , alpha = 0.3 )

Feature distributions (baseline vs current)

feature_stats

pd . DataFrame ( { 'Baseline Mean' : np . mean ( X_baseline , axis = 0 ) [ : 10 ] , 'Current Mean' : np . mean ( X_prod_drift , axis = 0 ) [ : 10 ] } ) axes [ 1 , 0 ] . bar ( range ( len ( feature_stats ) ) , feature_stats [ 'Baseline Mean' ] , alpha = 0.7 , label = 'Baseline' ) axes [ 1 , 0 ] . bar ( range ( len ( feature_stats ) ) , feature_stats [ 'Current Mean' ] , alpha = 0.7 , label = 'Current' ) axes [ 1 , 0 ] . set_xlabel ( 'Feature' ) axes [ 1 , 0 ] . set_ylabel ( 'Mean Value' ) axes [ 1 , 0 ] . set_title ( 'Feature Distribution Drift (First 10)' ) axes [ 1 , 0 ] . legend ( ) axes [ 1 , 0 ] . grid ( True , alpha = 0.3 , axis = 'y' )

Alerts over time

alert_types

[ a [ 'type' ] for a in monitor . drift_alerts ] alert_counts = pd . Series ( alert_types ) . value_counts ( ) axes [ 1 , 1 ] . barh ( alert_counts . index , alert_counts . values , color = [ 'red' , 'orange' , 'yellow' ] ) axes [ 1 , 1 ] . set_xlabel ( 'Count' ) axes [ 1 , 1 ] . set_title ( 'Alert Types Distribution' ) axes [ 1 , 1 ] . grid ( True , alpha = 0.3 , axis = 'x' ) plt . tight_layout ( ) plt . savefig ( 'model_monitoring_dashboard.png' , dpi = 100 , bbox_inches = 'tight' ) print ( "\nMonitoring dashboard saved as 'model_monitoring_dashboard.png'" )

8. Report generation

report

monitor
.
get_monitoring_report
(
)
print
(
"\n=== 7. Monitoring Report ==="
)
print
(
json
.
dumps
(
report
,
indent
=
2
,
default
=
str
)
)
print
(
"\nModel monitoring system setup completed!"
)
Drift Detection Techniques
Kolmogorov-Smirnov Test
Statistical test for distribution change
Population Stability Index
Measures feature distribution shifts
Chi-square Test
Categorical feature drift
Wasserstein Distance
Optimal transport-based distance
Jensen-Shannon Divergence
Information-theoretic metric
Alert Thresholds
Critical
>5% accuracy drop, immediate action
High
Data drift in 3+ features, investigation needed
Medium
Output drift, monitoring increased
Low
Single feature drift, log and track Deliverables Monitoring dashboards Alert configuration Drift detection reports Performance degradation analysis Retraining recommendations Action playbook
返回排行榜